tutorials/006 - Amazon Athena.ipynb (529 lines of code) (raw):
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"[](https://github.com/aws/aws-sdk-pandas)\n",
"\n",
"# 6 - Amazon Athena\n",
"\n",
"[awswrangler](https://github.com/aws/aws-sdk-pandas) has three ways to run queries on Athena and fetch the result as a DataFrame:\n",
"\n",
"- **ctas_approach=True** (Default)\n",
"\n",
" Wraps the query with a CTAS and then reads the table data as parquet directly from s3.\n",
" \n",
" * `PROS`:\n",
" - Faster for mid and big result sizes.\n",
" - Can handle some level of nested types.\n",
" * `CONS`:\n",
" - Requires create/delete table permissions on Glue.\n",
" - Does not support timestamp with time zone\n",
" - Does not support columns with repeated names.\n",
" - Does not support columns with undefined data types.\n",
" - A temporary table will be created and then deleted immediately.\n",
" - Does not support custom data_source/catalog_id.\n",
"\n",
"- **unload_approach=True and ctas_approach=False**\n",
"\n",
" Does an UNLOAD query on Athena and parse the Parquet result on s3.\n",
"\n",
" * `PROS`:\n",
" - Faster for mid and big result sizes.\n",
" - Can handle some level of nested types.\n",
" - Does not modify Glue Data Catalog.\n",
" * `CONS`:\n",
" - Output S3 path must be empty.\n",
" - Does not support timestamp with time zone\n",
" - Does not support columns with repeated names.\n",
" - Does not support columns with undefined data types.\n",
"\n",
"- **ctas_approach=False**\n",
"\n",
" Does a regular query on Athena and parse the regular CSV result on s3.\n",
" \n",
" * `PROS`:\n",
" - Faster for small result sizes (less latency).\n",
" - Does not require create/delete table permissions on Glue\n",
" - Supports timestamp with time zone.\n",
" - Support custom data_source/catalog_id.\n",
" * `CONS`:\n",
" - Slower (But stills faster than other libraries that uses the regular Athena API)\n",
" - Does not handle nested types at all."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import awswrangler as wr"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Enter your bucket name:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import getpass\n",
"\n",
"bucket = getpass.getpass()\n",
"path = f\"s3://{bucket}/data/\""
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Checking/Creating Glue Catalog Databases"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"if \"awswrangler_test\" not in wr.catalog.databases().values:\n",
" wr.catalog.create_database(\"awswrangler_test\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### Creating a Parquet Table from the NOAA's CSV files\n",
"\n",
"[Reference](https://registry.opendata.aws/noaa-ghcn/)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"is_executing": true
}
},
"outputs": [],
"source": [
"cols = [\"id\", \"dt\", \"element\", \"value\", \"m_flag\", \"q_flag\", \"s_flag\", \"obs_time\"]\n",
"\n",
"df = wr.s3.read_csv(\n",
" path=\"s3://noaa-ghcn-pds/csv/by_year/189\", names=cols, parse_dates=[\"dt\", \"obs_time\"]\n",
") # Read 10 files from the 1890 decade (~1GB)\n",
"\n",
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"is_executing": true
}
},
"outputs": [],
"source": [
"wr.s3.to_parquet(df=df, path=path, dataset=True, mode=\"overwrite\", database=\"awswrangler_test\", table=\"noaa\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"is_executing": true
}
},
"outputs": [],
"source": [
"wr.catalog.table(database=\"awswrangler_test\", table=\"noaa\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Reading with ctas_approach=False"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"is_executing": true
}
},
"outputs": [],
"source": [
"%%time\n",
"\n",
"wr.athena.read_sql_query(\"SELECT * FROM noaa\", database=\"awswrangler_test\", ctas_approach=False)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Default with ctas_approach=True - 13x faster (default)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"is_executing": true
}
},
"outputs": [],
"source": [
"%%time\n",
"\n",
"wr.athena.read_sql_query(\"SELECT * FROM noaa\", database=\"awswrangler_test\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Using categories to speed up and save memory - 24x faster"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"is_executing": true
}
},
"outputs": [],
"source": [
"%%time\n",
"\n",
"wr.athena.read_sql_query(\n",
" \"SELECT * FROM noaa\",\n",
" database=\"awswrangler_test\",\n",
" categories=[\"id\", \"dt\", \"element\", \"value\", \"m_flag\", \"q_flag\", \"s_flag\", \"obs_time\"],\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Reading with unload_approach=True"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"is_executing": true,
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"%%time\n",
"\n",
"wr.athena.read_sql_query(\n",
" \"SELECT * FROM noaa\",\n",
" database=\"awswrangler_test\",\n",
" ctas_approach=False,\n",
" unload_approach=True,\n",
" s3_output=f\"s3://{bucket}/unload/\",\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Batching (Good for restricted memory environments)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"is_executing": true
}
},
"outputs": [],
"source": [
"%%time\n",
"\n",
"dfs = wr.athena.read_sql_query(\n",
" \"SELECT * FROM noaa\",\n",
" database=\"awswrangler_test\",\n",
" chunksize=True, # Chunksize calculated automatically for ctas_approach.\n",
")\n",
"\n",
"for df in dfs: # Batching\n",
" print(len(df.index))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"is_executing": true
}
},
"outputs": [],
"source": [
"%%time\n",
"\n",
"dfs = wr.athena.read_sql_query(\"SELECT * FROM noaa\", database=\"awswrangler_test\", chunksize=100_000_000)\n",
"\n",
"for df in dfs: # Batching\n",
" print(len(df.index))"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parameterized queries"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### Client-side parameter resolution\n",
"\n",
"The `params` parameter allows client-side resolution of parameters, which are specified with `:col_name`, when `paramstyle` is set to `named`.\n",
"Additionally, Python types will map to the appropriate Athena definitions.\n",
"For example, the value `dt.date(2023, 1, 1)` will resolve to `DATE '2023-01-01`.\n",
"\n",
"For the example below, the following query will be sent to Athena:\n",
"```sql\n",
"SELECT * FROM noaa WHERE S_FLAG = 'E'\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"wr.athena.read_sql_query(\n",
" \"SELECT * FROM noaa WHERE S_FLAG = :flag_value\",\n",
" database=\"awswrangler_test\",\n",
" params={\n",
" \"flag_value\": \"E\",\n",
" },\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"### Server-side parameter resolution\n",
"\n",
"Alternatively, Athena supports server-side parameter resolution when `paramstyle` is defined as `qmark`.\n",
"The SQL statement sent to Athena will not contain the values passed in `params`.\n",
"Instead, they will be passed as part of a separate `params` parameter in `boto3`.\n",
"\n",
"The downside of using this approach is that types aren't automatically resolved.\n",
"The values sent to `params` must be strings.\n",
"Therefore, if one of the values is a date, the value passed in `params` has to be `DATE 'XXXX-XX-XX'`.\n",
"\n",
"The upside, however, is that these parameters can be used with prepared statements.\n",
"\n",
"For more information, see \"[Using parameterized queries](https://docs.aws.amazon.com/athena/latest/ug/querying-with-prepared-statements.html)\"."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"wr.athena.read_sql_query(\n",
" \"SELECT * FROM noaa WHERE S_FLAG = ?\",\n",
" database=\"awswrangler_test\",\n",
" params=[\"E\"],\n",
" paramstyle=\"qmark\",\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prepared statements"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"wr.athena.create_prepared_statement(\n",
" sql=\"SELECT * FROM noaa WHERE S_FLAG = ?\",\n",
" statement_name=\"statement\",\n",
")\n",
"\n",
"# Resolve parameter using Athena execution parameters\n",
"wr.athena.read_sql_query(\n",
" sql=\"EXECUTE statement\",\n",
" database=\"awswrangler_test\",\n",
" params=[\"E\"],\n",
" paramstyle=\"qmark\",\n",
")\n",
"\n",
"# Resolve parameter using Athena execution parameters (same effect as above)\n",
"wr.athena.read_sql_query(\n",
" sql=\"EXECUTE statement USING ?\",\n",
" database=\"awswrangler_test\",\n",
" params=[\"E\"],\n",
" paramstyle=\"qmark\",\n",
")\n",
"\n",
"# Resolve parameter using client-side formatter\n",
"wr.athena.read_sql_query(\n",
" sql=\"EXECUTE statement USING :flag_value\",\n",
" database=\"awswrangler_test\",\n",
" params={\n",
" \"flag_value\": \"E\",\n",
" },\n",
" paramstyle=\"named\",\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Clean up prepared statement\n",
"wr.athena.delete_prepared_statement(statement_name=\"statement\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cleaning Up S3"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"is_executing": true
}
},
"outputs": [],
"source": [
"wr.s3.delete_objects(path)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Delete table"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"is_executing": true
}
},
"outputs": [],
"source": [
"wr.catalog.delete_table_if_exists(database=\"awswrangler_test\", table=\"noaa\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Delete Database"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"is_executing": true
}
},
"outputs": [],
"source": [
"wr.catalog.delete_database(\"awswrangler_test\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.9.14",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.13"
}
},
"nbformat": 4,
"nbformat_minor": 4
}